Skip to content

feat(execution): add execution observer foundation#1097

Open
aryasaatvik wants to merge 6 commits into
RhysSullivan:mainfrom
aryasaatvik:contrib/execution-observer-foundation
Open

feat(execution): add execution observer foundation#1097
aryasaatvik wants to merge 6 commits into
RhysSullivan:mainfrom
aryasaatvik:contrib/execution-observer-foundation

Conversation

@aryasaatvik

@aryasaatvik aryasaatvik commented Jun 23, 2026

Copy link
Copy Markdown
Contributor

Summary

  • Add an SDK execution observer contract for execution lifecycle events.
  • Emit observer events from the execution engine for execution start/finish, tool calls, and elicitation interactions.
  • Compose plugin-provided observers into the shared execution stack and local app boot paths.

Notes

This is a domain lifecycle hook for plugin behavior such as history, metrics, indexing, or cache maintenance. It is not an OpenTelemetry replacement.

Non-interrupt observer failures are logged and isolated so they cannot fail the execution being observed. Interrupts still propagate as cancellation.

Implementation Shape

SDK Observer Contract

packages/core/sdk/src/execution-observer.ts

ExecutionEvent =
  | ExecutionStarted
  | ToolCallStarted
  | ToolCallFinished
  | InteractionStarted
  | InteractionResolved
  | ExecutionFinished

ExecutionObserver = {
  handle(event: ExecutionEvent): Effect<void, E>
}

emitExecutionEvent(event)
  -> reads currentExecutionObserver from Effect context
  -> observer.handle(event)

withExecutionObserver(observer)(effect)
  -> installs observer in Effect context
  -> logs and isolates non-interrupt observer failures
  -> preserves interrupt causes as cancellation

composeExecutionObservers(plugins, extensions)
  -> collects plugin.runtime.executionObserver(extension)
  -> dispatches each event to each plugin observer
  -> logs plugin id on observer failure

currentExecutionObserver is private. Callers install observers through withExecutionObserver and emit through emitExecutionEvent, which keeps failure isolation on the public path.

Plugin Hook

packages/core/sdk/src/plugin.ts

definePlugin(() => ({
  runtime: {
    executionObserver: (extension) => ({
      handle: (event) => Effect.Effect<void>
    })
  }
}))

The hook receives the plugin extension so observers can write to plugin-owned stores or services without adding engine dependencies on plugin implementations.

Engine Wiring

packages/core/execution/src/engine.ts

createExecutionEngine({ executor, codeExecutor, observer })
  -> observeExecution = withExecutionObserver(observer ?? noopExecutionObserver)

  -> execute(code, options)
       -> runInlineExecution(code, options).pipe(observeExecution)

  -> executeWithPause(code, options)
       -> startPausableExecution(code, options).pipe(observeExecution)

  -> resume(executionId, response)
       -> resumeExecution(executionId, response).pipe(observeExecution)

The observer is scoped around public engine methods so detached execution fibers inherit the observer context. That matters for pause/resume because an execution can emit InteractionResolved after the original request returns.

Shared Stack Wiring

packages/core/api/src/server/execution-stack.ts

makeExecutionStack(accountId, organizationId, organizationName)
  -> makeScopedExecutor(...)
  -> CodeExecutorProvider
  -> PluginsProvider
  -> composeExecutionObservers(plugins(), executor)
  -> createExecutionEngine({ executor, codeExecutor, observer })
  -> EngineDecorator.decorate(engine, identity)

Local app boot paths follow the same pattern by composing plugin observers and passing the observer into createExecutionEngine.

Event Flow

Inline Execution

engine.execute(code, { onElicitation, trigger })
  -> withExecutionObserver(observer)
     -> emitExecutionEvent(ExecutionStarted)
     -> codeExecutor.execute(code, observedInvoker)
        -> observedInvoker.invoke(toolCall)
           -> emitExecutionEvent(ToolCallStarted)
           -> inner.invoke(toolCall)
           -> emitExecutionEvent(ToolCallFinished)
        -> observeInlineElicitation(ctx)
           -> emitExecutionEvent(InteractionStarted)
           -> onElicitation(ctx)
           -> emitExecutionEvent(InteractionResolved)
     -> emitExecutionEvent(ExecutionFinished)

Pause/Resume Execution

engine.executeWithPause(code, { trigger })
  -> withExecutionObserver(observer)
     -> emitExecutionEvent(ExecutionStarted)
     -> fork detached code execution fiber
        -> tool calls emit ToolCallStarted/ToolCallFinished
        -> elicitation handler emits InteractionStarted
        -> return paused execution to caller
     -> later, engine.resume(executionId, response)
        -> withExecutionObserver(observer)
        -> complete paused Deferred
        -> detached fiber continues
        -> emitExecutionEvent(InteractionResolved)
        -> emitExecutionEvent(ExecutionFinished)

Example Plugin

Plugin observers should handle ExecutionEvent as an exhaustive tagged union. The Match.exhaustive closer makes future event variants a compile-time update point for plugins that care about every lifecycle event.

import { Effect, Match } from "effect";
import { definePlugin, type ExecutionEvent } from "@executor-js/sdk";

const handleExecutionEvent = (history: ExecutionHistoryExtension) =>
  Match.type<ExecutionEvent>().pipe(
    Match.withReturnType<Effect.Effect<void, unknown>>(),
    Match.tag("ExecutionStarted", (event) =>
      history.store.createRun({
        executionId: event.executionId,
        owner: event.owner,
        code: event.code,
        trigger: event.trigger,
        startedAt: event.startedAt,
      }),
    ),
    Match.tag("ToolCallStarted", (event) =>
      history.store.createToolCall({
        executionId: event.executionId,
        toolCallId: event.toolCallId,
        path: event.path,
        args: event.args,
        startedAt: event.startedAt,
      }),
    ),
    Match.tag("ToolCallFinished", (event) =>
      history.store.finishToolCall({
        toolCallId: event.toolCallId,
        status: event.status,
        result: event.result,
        error: event.error,
        completedAt: event.completedAt,
      }),
    ),
    Match.tag("InteractionStarted", (event) =>
      history.store.createInteraction({
        executionId: event.executionId,
        interactionId: event.interactionId,
        context: event.context,
        startedAt: event.startedAt,
      }),
    ),
    Match.tag("InteractionResolved", (event) =>
      history.store.resolveInteraction({
        interactionId: event.interactionId,
        status: event.status,
        response: event.response,
        error: event.error,
        completedAt: event.completedAt,
      }),
    ),
    Match.tag("ExecutionFinished", (event) =>
      history.store.finishRun({
        executionId: event.executionId,
        status: event.status,
        result: event.result,
        error: event.error,
        logs: event.logs,
        completedAt: event.completedAt,
      }),
    ),
    Match.exhaustive,
  );

const executionHistoryPlugin = definePlugin(() => ({
  id: "execution-history",
  storage: () => ({
    executions: executionTable,
    toolCalls: toolCallTable,
    interactions: interactionTable,
  }),
  extension: (ctx) => ({
    store: makeExecutionHistoryStore(ctx.storage),
  }),
  runtime: {
    executionObserver: (history) => ({
      handle: handleExecutionEvent(history),
    }),
  },
}));

Validation

  • bun run --cwd packages/core/sdk test -- execution-observer.test.ts
  • bun run --cwd packages/core/execution test src/engine-observer.test.ts
  • bun run --cwd packages/core/sdk typecheck
  • bun run --cwd packages/core/execution typecheck
  • bun run --cwd packages/core/api typecheck
  • git diff --check upstream/main...HEAD
  • touched-file oxfmt --check
  • touched-file oxlint -c .oxlintrc.jsonc --deny-warnings

Follow-up Scope

This PR intentionally does not add execution history, metrics export, semantic search indexing, or an OpenTelemetry bridge.

@aryasaatvik aryasaatvik marked this pull request as ready for review June 24, 2026 06:15
@aryasaatvik aryasaatvik force-pushed the contrib/execution-observer-foundation branch from ff3ac2f to a74821d Compare June 24, 2026 06:15
@greptile-apps

greptile-apps Bot commented Jun 24, 2026

Copy link
Copy Markdown

Greptile Summary

This PR introduces the execution observer foundation: a typed lifecycle event stream (ExecutionStarted/Finished, ToolCallStarted/Finished, InteractionStarted/Resolved) emitted by the execution engine, a plugin hook (runtime.executionObserver) for opt-in subscription, and composition helpers that fan events to all registered plugin observers while isolating non-interrupt failures and propagating interrupt causes correctly.

  • Observer contract (execution-observer.ts): Data.TaggedClass events with stable _tag discriminants, a Context.Reference-backed dispatch channel with interrupt-aware error isolation via handleExecutionObserverCause, and composeExecutionObservers for sequential per-plugin fan-out.
  • Engine wiring (engine.ts): Both runInlineExecution and startPausableExecution now bracket their full lifecycle with ExecutionStarted/Finished events; tool calls and elicitations are wrapped with matching Started/Finished/Resolved pairs. The daemon fiber inherits the observer context from the forkDetach scope so post-pause emissions flow to the same observer.
  • Integration points (execution-stack.ts, app.ts, main.ts): makeExecutionStack composes plugin observers before passing them to the engine; both local bypass paths receive the same treatment so no execution surface is unobserved.

Confidence Score: 5/5

The change is additive and opt-out by default; executions with no registered observer pay only a no-op context lookup, and the error isolation design prevents any observer from breaking a live execution.

The interrupt-propagation and failure-isolation logic is correct and well-tested. The daemon fiber correctly inherits the observer context through forkDetach. Both execution paths emit a complete and symmetric event sequence. The two findings are narrow edge cases under concurrent interruption that do not affect the common path.

No files require special attention for merge safety.

Important Files Changed

Filename Overview
packages/core/sdk/src/execution-observer.ts New file defining the full observer contract — tagged event classes, Context.Reference-backed dispatch, interrupt-aware error isolation, and plugin fan-out composition. Clean and well-structured.
packages/core/execution/src/engine.ts Wires ExecutionStarted/Finished, ToolCallStarted/Finished, and InteractionStarted/Resolved into both execution paths; daemon fiber inherits observer context correctly via forkDetach within the withExecutionObserver scope.
packages/core/sdk/src/execution-observer.test.ts Covers the core dispatch, failure isolation, interrupt propagation, and no-observer cases well; uses module-level mutable calls array that each test resets manually — works today but fragile under concurrent test runners.
packages/core/execution/src/engine-observer.test.ts Covers the full lifecycle on both the pausable and inline elicitation paths; validates event ordering, ID threading, and no-op behavior when no observer is registered.
packages/core/api/src/server/execution-stack.ts Adds composeExecutionObservers to makeExecutionStack with the correct phantom-type recovery cast; change is minimal and correctly threaded.
packages/core/sdk/src/executor.ts Exposes executor.owner (ownerBinding) on the Executor type so engine machinery can attribute events without re-threading identity.
packages/core/sdk/src/plugin.ts Adds optional runtime.executionObserver hook to PluginSpec; clean additive change with no breaking surface.

Sequence Diagram

%%{init: {'theme': 'neutral'}}%%
sequenceDiagram
    participant Caller
    participant Engine
    participant DaemonFiber
    participant Observer

    Caller->>Engine: executeWithPause(code, options)
    Engine->>Observer: ExecutionStarted
    Engine->>DaemonFiber: forkDetach (inherits Observer context)
    DaemonFiber->>Observer: ToolCallStarted
    DaemonFiber->>Observer: ToolCallFinished
    DaemonFiber->>Observer: InteractionStarted
    DaemonFiber-->>Engine: paused (Deferred)
    Engine-->>Caller: PausedExecution

    Caller->>Engine: resume(executionId, response)
    Engine->>DaemonFiber: Deferred.succeed(response)
    DaemonFiber->>Observer: InteractionResolved
    DaemonFiber->>Observer: ExecutionFinished
    Engine-->>Caller: ExecutionResult
Loading
%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%%
sequenceDiagram
    participant Caller
    participant Engine
    participant DaemonFiber
    participant Observer

    Caller->>Engine: executeWithPause(code, options)
    Engine->>Observer: ExecutionStarted
    Engine->>DaemonFiber: forkDetach (inherits Observer context)
    DaemonFiber->>Observer: ToolCallStarted
    DaemonFiber->>Observer: ToolCallFinished
    DaemonFiber->>Observer: InteractionStarted
    DaemonFiber-->>Engine: paused (Deferred)
    Engine-->>Caller: PausedExecution

    Caller->>Engine: resume(executionId, response)
    Engine->>DaemonFiber: Deferred.succeed(response)
    DaemonFiber->>Observer: InteractionResolved
    DaemonFiber->>Observer: ExecutionFinished
    Engine-->>Caller: ExecutionResult
Loading

Reviews (5): Last reviewed commit: "refactor(execution): scope observer disp..." | Re-trigger Greptile

Comment thread packages/core/sdk/src/execution-observer.ts Outdated
Comment thread packages/core/api/src/server/execution-stack.ts Outdated
@aryasaatvik aryasaatvik force-pushed the contrib/execution-observer-foundation branch from a5352a8 to df4389d Compare June 24, 2026 07:07
Comment thread packages/core/sdk/src/execution-observer.ts Outdated
aryasaatvik added a commit to aryasaatvik/executor that referenced this pull request Jun 24, 2026
## Summary

- Mirror the upstream execution observer foundation and hardening from
RhysSullivan#1097.
- Keep the dev branch aligned with the scoped observer API:
`withExecutionObserver`, `emitExecutionEvent`, and composed plugin
observers.
- Preserve fork-only execution actor fields while matching upstream
observer failure handling and deterministic dispatch behavior.
- Update dev-only execution observer plugins to use exhaustive Effect
`Match` dispatch for `ExecutionEvent` handling.

## Type Safety Note

Plugin observers handle `ExecutionEvent` as an exhaustive Effect
tagged-union match rather than a raw `switch (event._tag)` or predicate
chain.

`execution-history` now uses `Match.exhaustive` for the full lifecycle
stream, so a future event variant becomes a compile-time update point.
The metrics observers also use exhaustive matching and explicitly ignore
interaction events with no-op cases.

```ts
import { Effect, Match } from "effect";
import { type ExecutionEvent } from "@executor-js/sdk";

const handleExecutionEvent = (history: ExecutionHistoryExtension) =>
  Match.type<ExecutionEvent>().pipe(
    Match.withReturnType<Effect.Effect<void, unknown>>(),
    Match.tag("ExecutionStarted", (event) => history.store.createRun(event)),
    Match.tag("ToolCallStarted", (event) => history.store.createToolCall(event)),
    Match.tag("ToolCallFinished", (event) => history.store.finishToolCall(event)),
    Match.tag("InteractionStarted", (event) => history.store.createInteraction(event)),
    Match.tag("InteractionResolved", (event) => history.store.resolveInteraction(event)),
    Match.tag("ExecutionFinished", (event) => history.store.finishRun(event)),
    Match.exhaustive,
  );
```

## Validation

- `bun run --cwd packages/core/sdk test -- execution-observer.test.ts`
- `bun run --cwd packages/core/execution test --
engine-observer.test.ts`
- `bun run --cwd packages/core/sdk typecheck`
- `bun run --cwd packages/core/execution typecheck`
- `bun run --cwd packages/plugins/execution-history test`
- `bun run --cwd packages/plugins/execution-metrics test`
- `bun run --cwd packages/plugins/execution-history typecheck`
- `bun run --cwd packages/plugins/execution-metrics typecheck`
- touched-file `oxfmt --check`
- touched-file `oxlint -c .oxlintrc.jsonc --deny-warnings`
- `git diff --check`
@aryasaatvik

Copy link
Copy Markdown
Contributor Author

Additional downstream context from my fork: this observer API is the core primitive used by optional execution-history and execution-metrics plugins.

High-level shape:

execution engine
  -> emits ExecutionEvent union
  -> composeExecutionObservers(...)
  -> plugin.runtime.executionObserver(extension)
  -> plugin-owned sink

AST outline of one downstream plugin:

executionHistoryPlugin
  -> pluginStorage: { runs }
  -> storage(deps): makeExecutionHistoryStore(deps)
  -> extension(ctx): { list, get, handleEvent }
  -> runtime.executionObserver(self): makeExecutionHistoryObserver(self)

Fork permalinks:

Call stack:

engine.execute / executeWithPause / resume
  -> emitExecutionEvent(event)
  -> composed observer
  -> executionHistory.handleEvent(event)
  -> buffered plugin-owned storage writer
  -> runs list/detail read model

This is why I think the PR is the right size: it contributes only the lifecycle hook, not the history or metrics products. Once this lands, those plugins can be proposed independently as ordinary consumers of the primitive.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant